/**
 * Copyright (c) 2019 Coder League
 * All rights reserved.
 *
 * File：BaseDefaultMQPushConsumer.java
 * History:
 *         2019年7月21日: Initially created, CJH.
 */
package club.coderleague.ilsp.common.rocketmq.consumer;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;

import lombok.extern.slf4j.Slf4j;

/**
 * 消息消费者基础类。
 * 
 * @author CJH
 */
@Slf4j
public abstract class BaseDefaultMQPushConsumer {
	
	/**
	 * NameServer服务地址。
	 */
	@Value("${custom.rocketmq.namesrv-addr}")
    private String namesrvAddr;
	
	/**
	 * 消费者。
	 */
	private DefaultMQPushConsumer consumer;
	
	/**
	 * 消费者组。
	 */
	private String consumerGroup;
	
	/**
	 * 订阅主题。
	 */
	private String topic;
	
	/**
	 * 订阅表达式。
	 */
	private String subExpression;
	
	/**
	 * 构造方法。
	 * 
	 * @param consumerGroup 消费者组。
	 * @param topic 订阅主题。
	 */
	public BaseDefaultMQPushConsumer(String consumerGroup, String topic) {
		this(consumerGroup, topic, null);
	}
	
	/**
	 * 构造方法。
	 * 
	 * @param consumerGroup 消费者组。
	 * @param topic 订阅主题。
	 * @param subExpression 订阅表达式。
	 */
	public BaseDefaultMQPushConsumer(String consumerGroup, String topic, String subExpression) {
		this.consumerGroup = consumerGroup;
		this.topic = topic;
		this.subExpression = subExpression;
	}
	
	/**
	 * 初始化方法。
	 * 
	 * @author CJH 2019年7月21日
	 */
	@PostConstruct
	private void init() {
		this.consumer = new DefaultMQPushConsumer(this.consumerGroup);
		this.consumer.setNamesrvAddr(this.namesrvAddr);
		try {
			this.consumer.subscribe(this.topic, this.subExpression);
			this.consumer.registerMessageListener((MessageListenerConcurrently) (messageExts, consumeConcurrentlyContext) -> {
				try {
					for (MessageExt messageExt : messageExts) {
						log.info("消费消息：{}", messageExt.toString());
						this.messageHandle(messageExt, consumeConcurrentlyContext);
					}
				} catch (Exception e) {
					e.printStackTrace();
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				}
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			});
			preStart(this.consumer);
			this.consumer.start();
			log.info("consumer group is the {} startup success of '{}:{}'", DefaultMQPushConsumer.class.getTypeName(), this.consumerGroup, this.topic);
		} catch (MQClientException e) {
			log.error("consumer group is the {} startup failure of '{}:{}'", DefaultMQPushConsumer.class.getTypeName(), this.consumerGroup, this.topic);
			e.printStackTrace();
		}
	}
	
	/**
	 * 停止消费者
	 * 
	 * @author CJH 2019年7月22日
	 */
	@PreDestroy
	private void destroy() {
		this.consumer.shutdown();
		log.info("consumer group is the {} shutdown success of '{}:{}'", DefaultMQPushConsumer.class.getTypeName(), this.consumerGroup, this.topic);
	}

	/**
	 * 消费者启动之前执行。
	 * 
	 * @author CJH 2019年7月21日
	 * @param consumer 消费者实例。
	 */
	protected void preStart(DefaultMQPushConsumer consumer) {}
	
	/**
	 * 消息处理。
	 * 
	 * @author CJH 2019年7月21日
	 * @param messageExt 消息扩展
	 * @param consumeConcurrentlyContext 消息上下文
	 * @throws Exception 异常信息
	 */
	protected abstract void messageHandle(MessageExt messageExt, ConsumeConcurrentlyContext consumeConcurrentlyContext) throws Exception;
}
